#
# Watchs out the registry for the octopus.
# 1. Old, and not refreshed entries are removed, and their Ofs processes killed.
# 2. New ofs entries are announced via plumb messages
# 3. Gone ofs entries are announced as gone via plumb messages

implement Watcher;

include "sys.m";
	sys: Sys;
	FD, fprint, sprint, remove, read, sleep, unmount,
	pctl, NEWPGRP, OREAD, dirread, open: import sys;
include "error.m";
	err: Error;
	stderr, checkload, panic, error, kill: import err;
include "arg.m";
	arg: Arg;
	usage: import arg;
include "registries.m";
	regs: Registries;
	Service, Registered, Attributes, Registry: import regs;
include "draw.m";
include "plumbmsg.m";
	plumbmsg: Plumbmsg;
	Msg: import plumbmsg;
include "daytime.m";
	daytime: Daytime;

Watcher: module
{
	init:	fn(nil: ref Draw->Context, nil: list of string);
};

verbose := 0;
debug := 0;
terms: list of (string, int);

dump()
{
	s := "terms: ";
	for (l := terms; l != nil; l = tl l){
		(t, p) := hd l;
		s += sprint("%s:%d ", t, p);
	}
	fprint(stderr, "%s\n", s);
}

newterm(name: string)
{
	text := "arrived: " + "/devs/" + name;
	if (verbose)
		fprint(stderr, "%s\n", text);
	m := ref Msg("ofs", "", "/", "text", nil, array of byte text);
	m.send();
}

goneterm(name: string, pid: int)
{
	text := "gone: " + "/devs/" + name;
	if (verbose)
		fprint(stderr, "%s\n", text);
	m := ref Msg("ofs", "", "/", "text", nil, array of byte text);
	m.send();
	kill(pid, "killgrp");
	unmount(nil, "/devs/" + name);
}

scan(reg: ref Registry)
{
	attrs := list of {("name", "ofs")};
	(svcs, e) := reg.find(attrs);
	if (e != nil){
		fprint(stderr, "watcher: scan: %r");
		return;
	}

	nl : list of (string, int);
	for(l := terms; l != nil; l = tl l){
		(tname, pid) := hd l;
		for(sl := svcs; sl != nil; sl = tl sl){
			s := hd sl;
			if (tname == s.attrs.get("sys"))
				break;
		}
		if (sl == nil)
			goneterm(tname, int pid);
		else
			nl = (tname, pid)::nl;
	}
	terms = nl;

	for(; svcs != nil; svcs = tl svcs){
		s := hd svcs;
		sname := s.attrs.get("sys");
		pid := s.attrs.get("pid");
		for(l = terms; l != nil; l = tl l){
			(tname, nil) := hd l;
			if (sname == tname)
				break;
		}
		if (l == nil && pid != nil){
			newterm(sname);
			terms = (sname, int pid)::terms;
		}
	}
}

# ANY entry that starts with "o!" is potentially gc'd.
collectable(n: string): int
{
	if (len n > 2 && n[0:2] == "o!")
		return 1;
	return 0;
}

dropold(reg: ref Registry, tmout: int)
{
	fd := open(reg.dir, OREAD);
	if (fd == nil)
		panic(sprint("dropold: %s: %r\n", reg.dir));
	now := daytime->now();
	for(;;){
		(n, dirs) := dirread(fd);
		if (n <= 0)
			break;
		for(i := 0; i < n; i++)
			if (dirs[i].name != "event" && dirs[i].name != "find" && dirs[i].name[0] != '.')
			if(collectable(dirs[i].name))
			if (dirs[i].name != "index" && dirs[i].name != "new"){
				if (now - dirs[i].mtime > tmout){
					if (verbose)
						fprint(stderr, "timed out: %s (%d secs)\n", dirs[i].name, now - dirs[i].mtime > tmout );
					remove(reg.dir + "/" + dirs[i].name);
				}
			}
	}
}

watcher(reg: ref Registry, c: chan of int, tmout: int)
{
	scan(reg);
	for(;;){
		if (debug)
			dump();
		<-c;
		if (tmout != 0)
			dropold(reg, tmout);
		scan(reg);
	}
}

events(cfd: ref FD, c: chan of int)
{
	msg := array[30] of byte;
	for(;;){
		nr := read(cfd, msg, len msg);
		if (nr <= 0)
			panic("watcher: event eof");
		c <-= 0;
	}
}

timer(t: int, c: chan of int)
{
	for(;;){
		sleep(t);
		c <-= 0;
	}
}

init(nil: ref Draw->Context, args: list of string)
{
	regdir := "/mnt/registry";
	tick := 60 * 1000;
	tmout := tick;
	sys = load Sys Sys->PATH;
	err = load Error Error->PATH;
	err->init();
	regs = checkload(load Registries Registries->PATH, Registries->PATH);
	regs->init();
	plumbmsg = checkload(load Plumbmsg Plumbmsg->PATH, Plumbmsg->PATH);
	if (plumbmsg->init(1, nil, 0) < 0)
		error(sprint("plumbmsg: %r"));
	daytime = checkload(load Daytime Daytime->PATH, Daytime->PATH);
	arg = checkload(load Arg Arg->PATH, Arg->PATH);
	arg->init(args);
	arg->setusage("watcher [-dv] [-i ms] [-t tmout] [-r regdir]");
	while((opt := arg->opt()) != 0) {
		case opt{
		'd' =>
			debug = verbose = 1;
		'i' =>
			tick = int arg->earg();
			if (tick < 5000)
				tick = 5000;
		't' =>
			tmout = int arg->earg();
		'r' =>
			regdir = arg->earg();
		'v' =>
			verbose = 1;
		* =>
			usage();
		}
	}
	if (tmout != 0 && tmout < 1)
		tmout = 1;
	args = arg->argv();
	if (len args != 0)
		usage();
	reg := Registry.new(regdir);
	if (reg == nil)
		error(sprint("registry: %r"));
	efd := open(regdir + "/event", OREAD);	# open here to abort in parent.
	if (efd == nil)
		error(sprint("no registr event: %r (old inferno?)"));
	c := chan of int;
	pctl(NEWPGRP, nil);
	spawn timer(tick, c);
	spawn events(efd, c);
	spawn watcher(reg, c, tmout);
}
